package net;

import com.alibaba.middleware.race.sync.Constants2;
import com.alibaba.middleware.race.sync.Server;
import dev.MapEntry;
import dev.MiscUtils;
import dev.RecordMeta;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
 * Author :  Rocky
 * Date : 13/06/2017 16:23
 * Description :
 * Test :
 */
public class NetworkServer {

    private static final Logger log = LoggerFactory.getLogger(Server.class);

    private int port;

    //用来和客户端通信
    private ChannelHandlerContext chc;

    //|4byte(长度)|byte数组(传输的数据)|
    private ByteBuf sendBuffer;

    private CountDownLatch latch = new CountDownLatch(1);

    //用来关闭网络服务
    private Channel serverChannel;

    private boolean hasSendRecordMeta;

    private boolean hasSendPkStartAndEnd;


    public NetworkServer(int port) {
        this.port = port;
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                startNetty();
            }
        });
        t.start();

        sendBuffer = initSendBuffer();
    }

    //会放入sendBuffer
    public boolean sendInsert(long pk, List<Object> colomns) {

        //写 操作类型 (0:insert,1:update,2:delete)
        sendBuffer.writeByte(0);

        //写 pk
        sendBuffer.writeLong(pk);

        //写各个列的值
        for (Object colomn : colomns) {
            if (colomn instanceof byte[]) {

                //写 无符号数
                sendBuffer.writeShort(((byte[]) colomn).length);
                sendBuffer.writeBytes((byte[]) colomn);
            } else {
                sendBuffer.writeLong((Long) colomn);
            }
        }

        //更新长度，并检查是否发送
        updateLengthAndSendNecessary();

        return true;
    }

    //会放入sendBuffer
    public boolean sendDelete(long pk) {
        //写 操作类型 (0:insert,1:update,2:delete)
        sendBuffer.writeByte(2);

        //写 pk
        sendBuffer.writeLong(pk);

        //更新长度，并检查是否发送
        updateLengthAndSendNecessary();
        return true;
    }

    //会放入sendBuffer
    public boolean sendUpdate(long pkOldValue, long pkNewValue, Map<String, MapEntry> updatedData) {
        //写 操作类型 (0:insert,1:update,2:delete)
        sendBuffer.writeByte(1);

        //写 oldPk
        sendBuffer.writeLong(pkOldValue);

        //写 newPk
        sendBuffer.writeLong(pkNewValue);

        //写 更新的字段个数
        sendBuffer.writeByte(updatedData.size());

        //写 更新的字段
        for (Map.Entry<String, MapEntry> entry : updatedData.entrySet()) {

            String updatedColumnName = entry.getKey();

            MapEntry value = entry.getValue();

            Object oldValue = value.getKey();

            Object newValue = value.getValue();


            //写 发送更新的字段名，用byte来代替
            sendBuffer.writeByte(RecordMeta.encodeFieldName(updatedColumnName));

            //如果这个字段属于字符类型
            if (oldValue instanceof byte[]) {

                byte[] oldByteValue = (byte[]) oldValue;

                byte[] newByteValue = (byte[]) newValue;

                //写 变更前的值
                //无符号数
                sendBuffer.writeShort(oldByteValue.length);
                sendBuffer.writeBytes(oldByteValue);

                //写 变更后的值
                //无符号数
                sendBuffer.writeShort(newByteValue.length);
                sendBuffer.writeBytes(newByteValue);
            }
            //如果这个字段属于数字类型
            else {
                //写 变更前的值
                sendBuffer.writeLong((Long) oldValue);

                //写 变更后的值
                sendBuffer.writeLong((Long) newValue);
            }
        }

        //更新长度，并检查是否发送
        updateLengthAndSendNecessary();
        return true;
    }

    //直接发送，不走sendBuffer
    private boolean sendRecordMeta() {

        if (hasSendRecordMeta) {
            return true;
        }

        ByteBuf bb = RecordMeta.encode();
        try {
            long s = System.currentTimeMillis();
            latch.await();
            log.info("等待client连接耗时" + (System.currentTimeMillis() - s) + "ms");
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }

        chc.writeAndFlush(bb);
        chc.flush();
        sendEndFlag();

        hasSendRecordMeta = true;
        return true;
    }

    private boolean sendPkStartAndEnd() {

        if (hasSendPkStartAndEnd) {
            return true;
        }

        ByteBuf bb = MiscUtils.directBuffer(20);
        bb.writeInt(20);
        bb.writeLong(Constants2.pkStart);
        bb.writeLong(Constants2.pkEnd);

        chc.writeAndFlush(bb);
        chc.flush();

        sendEndFlag();

        hasSendPkStartAndEnd = true;
        return true;
    }


    //数据发送完成后，或 结果集发送完成后 的结束标识符
    public boolean sendEndFlag() {
        ByteBuf endBb = MiscUtils.directBuffer(5);
        //如果长度等于1，并且value=-1的话，说明是结束符
        endBb.writeInt(5);
        endBb.writeByte(-1);

        chc.writeAndFlush(endBb);
        chc.flush();
        return true;
    }


    private void updateLengthAndSendNecessary() {
        //更新长度
        sendBuffer.setInt(0, sendBuffer.writerIndex());

        //检查是否满了，满了进行发送
        if (sendBuffer.writerIndex() >= Constants2.SEND_BUFFER_SIZE) {

            sendRecordMeta();

            sendPkStartAndEnd();

            chc.writeAndFlush(sendBuffer);
            chc.flush();
            sendBuffer = initSendBuffer();
        }
    }

    //测试时候使用
    public void flush() {

        sendRecordMeta();

        sendPkStartAndEnd();

        if (sendBuffer.getInt(0) <= 4) {
            return;
        }
        chc.writeAndFlush(sendBuffer);
        chc.flush();
        sendBuffer = initSendBuffer();
    }

    private ByteBuf initSendBuffer() {
        ByteBuf buf = MiscUtils.directBuffer(Constants2.SEND_BUFFER_SIZE);
        //初始化长度
        buf.writeInt(4);
        return buf;
    }

    //发送部分结果数据
    public boolean sendResultData(ByteBuf resultDataPart) {
        chc.writeAndFlush(resultDataPart);
        chc.flush();
        return true;
    }

    private long startTime;

    private void startNetty() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup);
        b.channel(NioServerSocketChannel.class);
        b.option(ChannelOption.SO_BACKLOG, 128);
        b.childOption(ChannelOption.SO_KEEPALIVE, true);
        b.childHandler(new ChildChannelHandler());

        try {
            ChannelFuture f = b.bind(port).sync();
            serverChannel = f.channel();
            startTime = System.currentTimeMillis();
            serverChannel.closeFuture().sync();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            log.info("网络服务关闭");
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new NetworkServerHandler());
        }
    }

    private class NetworkServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info("客户端连接成功，等待连接耗时: " + (System.currentTimeMillis() - startTime) + " ms");
            chc = ctx;
            latch.countDown();
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            log.info("客户端关闭连接");
            serverChannel.close();
        }
    }

}
